package com.gitee.yanfanvip;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import org.jgroups.JChannel;
import org.jgroups.protocols.BARRIER;
import org.jgroups.protocols.CENTRAL_LOCK;
import org.jgroups.protocols.COUNTER;
import org.jgroups.protocols.FD_ALL;
import org.jgroups.protocols.FD_SOCK;
import org.jgroups.protocols.FRAG2;
import org.jgroups.protocols.MERGE3;
import org.jgroups.protocols.MFC_NB;
import org.jgroups.protocols.MULTI_PING;
import org.jgroups.protocols.TCPPING;
import org.jgroups.protocols.TCP_NIO2;
import org.jgroups.protocols.UNICAST3;
import org.jgroups.protocols.VERIFY_SUSPECT;
import org.jgroups.protocols.pbcast.GMS;
import org.jgroups.protocols.pbcast.NAKACK2;
import org.jgroups.protocols.pbcast.STABLE;
import org.jgroups.protocols.pbcast.STATE_TRANSFER;
import org.jgroups.stack.Protocol;
import com.gitee.yanfanvip.cluster.ClusterDatabase;
import com.gitee.yanfanvip.cluster.RemoteCluster;
import com.gitee.yanfanvip.interfaces.Database;
import com.gitee.yanfanvip.util.IPHelp;

public class ClusterBuild {
	
	String cluster = "CLUSTER";
	int port = 11850;
	String[] nodes;
	
	List<Protocol> protstack;
	JChannel channel = null;
	RemoteCluster remote = null;
	ClusterDatabase database = null;
	
	public ClusterBuild(String cluster, int port, String nodes) throws Exception {
		this.cluster = cluster;
		this.port = port;
		this.nodes = nodes.split(",");
		initProtstack();
	}
	
	public String getCluster() {
		return cluster;
	}
	
	static TCP_NIO2 getTCP(int port) {
		TCP_NIO2 tcp = new TCP_NIO2();
		tcp.setBindAddress(IPHelp.getAddress());
		tcp.setBindPort(port).setPortRange(5).setBindToAllInterfaces(true);
		tcp.setValue("use_ip_addrs", true);
		return tcp;
	}
	
	static TCPPING getPing(String[] nodes, int defaultPort){
		List<InetSocketAddress> address = new ArrayList<InetSocketAddress>();
		for (String node : nodes) {
			String[] adds = node.split(":");
			if(adds.length > 1){
				address.add(new InetSocketAddress(adds[0], Integer.valueOf(adds[1])));
			}else{
				address.add(new InetSocketAddress(node, defaultPort));
			}
		}
		TCPPING PING = new TCPPING();
		PING.setInitialHosts(address);
		PING.setPortRange(5);
		return PING;
	}
	
	private void initProtstack() {
		remote = new RemoteCluster(cluster);
		protstack = new ArrayList<>();
		protstack.add(getTCP(port));//传输协议
		protstack.add(remote);//远程集群消息接收
		protstack.add(getPing(nodes, port));//发现协议
		protstack.add(new MULTI_PING());//多协议监测
		protstack.add(new MERGE3());//---如果群集由于某些原因（例如网络分区）而被拆分，则此协议会将子群集合并回一个群集---//
		protstack.add(new FD_SOCK());//---故障检测---//
		protstack.add(new FD_ALL());//---故障检测---//
		protstack.add(new VERIFY_SUSPECT());//---故障检测---//
		protstack.add(new BARRIER());//---一些状态传输协议使用BARRIER，因为它允许现有线程完成并阻止新线程一次性获取摘要和状态----//
		protstack.add(new NAKACK2());//-- NAKACK2为发送到集群中所有节点的消息提供可靠的传送和FIFO ---//
		protstack.add(new UNICAST3());//---UNICAST3为发送方和接收方之间的点对点消息提供可靠的传送和FIFO----//
		protstack.add(new STABLE());//---稳定----//
		protstack.add(new GMS());//---加入新会员----//
		protstack.add(new MFC_NB());//---使用MFC（多播流控制）和单播流控制（UFC）实现流控制---//
		protstack.add(new FRAG2());//---FRAG2仅对有效负载（不包括标头）进行分段----//
		protstack.add(new STATE_TRANSFER());//State状态同步
		protstack.add(new COUNTER());//原子计数器
		protstack.add(new CENTRAL_LOCK());//锁
		protstack.add(new ClusterDatabase(cluster, port));//数据库
	}
	
	public ClusterBuild addSite(String cluster, String service) throws Exception {
		if(this.cluster.equals(cluster)) { return this; }
		remote.addSite(cluster, service);
		return this;
	}
	
	public JChannel getChannel() throws Exception{
		if(channel == null){
			channel = new JChannel(protstack);
			channel.setDiscardOwnMessages(true);
			channel.connect(cluster);
			database = channel.getProtocolStack().findProtocol(ClusterDatabase.class);
		}
		if(!this.channel.isConnected()){
    	   this.channel.connect(cluster);
        }
		return channel;
	}
	
	public Database build() throws Exception {
		getChannel();
		return database;
	}
}
